Skip to content

Conversation

@vanekjar
Copy link

@vanekjar vanekjar commented Oct 23, 2025

What changes were proposed in this pull request?

This PR improves the Spark SQL optimizer’s InferFiltersFromConstraints rule to infer filter conditions from join constraints that involve complex expressions, not just simple attribute equalities.

Currently, the optimizer can only infer additional constraints when the join condition is a simple equality (e.g., a = b). For more complex expressions, such as arithmetic operations, it does not infer the corresponding filter.

Example (currently works as expected):

SELECT *
FROM t1
JOIN t2 ON t1.a = t2.b
WHERE t2.b = 1

In this case, the optimizer correctly infers the additional constraint t1.a = 1.

Example (now handled by this PR):

SELECT *
FROM t1
JOIN t2 ON t1.a = t2.b + 2
WHERE t2.b = 1

Here, it is clear that t1.a = 3 (since t2.b = 1 and t1.a = t2.b + 2), but previously the optimizer did not infer this constraint. With this change, the optimizer can now deduce and push down t1.a = 3.

How was this patch tested?

You can reproduce and verify the improvement with the following:

spark.sql("CREATE TABLE t1(a INT)")
spark.sql("CREATE TABLE t2(b INT)")

spark.sql("""
SELECT * 
FROM t1 
INNER JOIN t2 ON t2.b = t1.a + 2 
WHERE t1.a = 1
""").explain

Before this change, the physical plan does not include the inferred filter:

== Physical Plan ==
AdaptiveSparkPlan
+- BroadcastHashJoin [(a#2 + 2)], [b#3], Inner, BuildRight, false
   :- Filter (isnotnull(a#2) AND (a#2 = 1))
   :  +- FileScan spark_catalog.default.t1[a#2]
      +- Filter isnotnull(b#3)
         +- FileScan spark_catalog.default.t2[b#3]

With this PR, the optimizer should infer and push down t2.b = 3 as an additional filter.

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [(a#2 + 2)], [b#3], Inner, BuildRight, false
  :- Filter (isnotnull(a#2) AND (a#2 = 1))
  :  +- FileScan spark_catalog.default.t1[a#2]
  +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=27]
     +- Filter ((b#3 = 3) AND isnotnull(b#3))
        +- FileScan spark_catalog.default.t2[b#3]    

Why are the changes needed?

Without this enhancement, the optimizer cannot push down filters or optimize query execution plans for queries with complex join conditions, which can lead to suboptimal join performance.

@github-actions github-actions bot added the SQL label Oct 23, 2025
@vanekjar vanekjar changed the title [SPARK-53996][SQL] Improve InferFiltersFromConstraints to infers from complex join expressions [SPARK-53996][SQL] Improve InferFiltersFromConstraints to infer filters from complex join expressions Oct 23, 2025
Copy link
Contributor

@andylam-db andylam-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what you're trying to do is to propagate literals in InferFilterFromConstraints.

The crux of the problem is:

  1. InferFilterFromConstraints DOES consider join conditions and expressions across multiple operators, but it does not consider literals.
  2. ConstantPropagation considers literals, but does not consider operators outside a single Filter node.

Can we just reuse logic in ConstantPropagation (which is more robust, and historically tested) in InferFilterFromConstraints.getAllConstraints?

Comment on lines 116 to 121
// Avoid inferring tautologies like 1 = 1
val isTautology = replaced match {
case EqualTo(left: Expression, right: Expression) if left.foldable && right.foldable =>
left.eval() == right.eval()
case _ => false
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little complicated and potentially non-performant given that we have to do expression evaluation in the driver during compilation.

Copy link
Author

@vanekjar vanekjar Oct 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion above, I'll try to reuse existing code as much as possible.

Regarding the performance. Expression evaluation already happens in the driver in ConstantFolding rule.

object ConstantFolding extends Rule[LogicalPlan] {
// This tag is for avoid repeatedly evaluating expression inside conditional expression
// which has already failed to evaluate before.
private[sql] val FAILED_TO_EVALUATE = TreeNodeTag[Unit]("FAILED_TO_EVALUATE")
private def hasNoSideEffect(e: Expression): Boolean = e match {
case _: Attribute => true
case _: Literal => true
case c: Cast if !conf.ansiEnabled => hasNoSideEffect(c.child)
case _: NoThrow if e.deterministic => e.children.forall(hasNoSideEffect)
case _ => false
}
private def tryFold(expr: Expression, isConditionalBranch: Boolean): Expression = {
try {
Literal.create(expr.freshCopyIfContainsStatefulExpression().eval(EmptyRow), expr.dataType)

@vanekjar
Copy link
Author

vanekjar commented Nov 4, 2025

@andylam-db Thanks for you suggestion, I simplified the code to reuse logic from ConstantPropgation rule. I introduced ConstantPropagationHelper to make the logic reusable across optimizer rules.

@vanekjar vanekjar requested a review from andylam-db November 5, 2025 01:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants